{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "model = \"featurestore\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "from hops import featurestore\n",
    "import tensorflow as tf\n",
    "from tensorflow import keras\n",
    "from tensorflow.keras import layers\n",
    "from hops import experiment\n",
    "from tensorflow.python.keras.callbacks import TensorBoard\n",
    "from hops import tensorboard"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "NUM_EPOCHS = 30\n",
    "BATCH_SIZE = 10\n",
    "LEARNING_RATE = 0.001\n",
    "SHUFFLE_BUFFER_SIZE = 10000\n",
    "INPUT_SHAPE=4"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "def create_tf_dataset():\n",
    "    dataset_dir = featurestore.get_training_dataset_path(\"predict_house_sold_for_dataset\")\n",
    "    input_files = tf.gfile.Glob(dataset_dir + \"/part-r-*\")\n",
    "    dataset = tf.data.TFRecordDataset(input_files)\n",
    "    tf_record_schema = featurestore.get_training_dataset_tf_record_schema(\"predict_house_sold_for_dataset\")\n",
    "    feature_names = [\"avg_house_age\", \"avg_house_size\", \"avg_house_worth\", \"avg_num_bidders\"]\n",
    "    label_name = \"avg_sold_for\"\n",
    "    def decode(example_proto):\n",
    "        example = tf.parse_single_example(example_proto, tf_record_schema)\n",
    "        x = []\n",
    "        for feature_name in feature_names:\n",
    "            x.append(example[feature_name])\n",
    "        y = [tf.cast(example[label_name], tf.float32)]\n",
    "        return x,y\n",
    "    dataset = dataset.map(decode).shuffle(SHUFFLE_BUFFER_SIZE).batch(BATCH_SIZE).repeat(NUM_EPOCHS)\n",
    "    return dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "def create_model():\n",
    "    model = tf.keras.Sequential([\n",
    "        layers.Dense(64, activation='relu', input_shape = (INPUT_SHAPE,), batch_size=BATCH_SIZE),\n",
    "        layers.Dense(64, activation='relu'),\n",
    "        layers.Dense(1)])\n",
    "    return model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "def export_model(classifier):\n",
    "    \"\"\"\n",
    "    Exports trained model \n",
    "    \n",
    "    Args:\n",
    "        :classifier: the model to export\n",
    "    \"\"\"\n",
    "    def _serving_input_receiver_fn():\n",
    "        # key (e.g. 'examples') should be same with the inputKey when you \n",
    "        # buid the request for prediction\n",
    "        receiver_tensors = {\"dense_input\":tf.placeholder(dtype=tf.float32,shape=[1,INPUT_SHAPE])}\n",
    "        return tf.estimator.export.ServingInputReceiver(receiver_tensors, receiver_tensors)\n",
    "    from hops import serving\n",
    "    from hops import hdfs\n",
    "    export_path = hdfs.project_path()+\"/Models/\" + model                                                      \n",
    "    if not hdfs.exists(export_path):\n",
    "        hdfs.mkdir(export_path)\n",
    "    classifier.export_savedmodel(export_path, _serving_input_receiver_fn)\n",
    "#     serving.export(os.getcwd() + \"/model\",\"featurestore\", 2"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "def train_fn():\n",
    "    \n",
    "    import tensorflow as tf\n",
    "    import numpy as np\n",
    "    from hops import tensorboard\n",
    "    from hops import hdfs\n",
    "    \n",
    "    model = create_model()\n",
    "    model.compile(optimizer=tf.train.AdamOptimizer(LEARNING_RATE), loss='mse',  metrics=['accuracy'])\n",
    "    run_config = tf.contrib.learn.RunConfig(\n",
    "        model_dir=tensorboard.logdir(),\n",
    "        save_checkpoints_steps=10,\n",
    "        save_summary_steps=5,\n",
    "        log_step_count_steps=10)\n",
    "    keras_estimator = tf.keras.estimator.model_to_estimator(keras_model=model, \n",
    "               config=run_config, model_dir=tensorboard.logdir())\n",
    "    tf.estimator.train_and_evaluate(keras_estimator, train_spec=tf.estimator.TrainSpec(input_fn=create_tf_dataset),\n",
    "        eval_spec=tf.estimator.EvalSpec(input_fn=create_tf_dataset))\n",
    "    export_model(keras_estimator)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Finished Experiment \n",
      "\n",
      "'hdfs://10.0.2.15:8020/Projects/demo_deep_learning_admin000/Experiments/application_1561468620886_0012/launcher/run.1'"
     ]
    }
   ],
   "source": [
    "experiment.launch(train_fn, \n",
    "                  name=\"feature store quick start example\", \n",
    "                  local_logdir=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "PySpark",
   "language": "",
   "name": "pysparkkernel"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "python",
    "version": 2
   },
   "mimetype": "text/x-python",
   "name": "pyspark",
   "pygments_lexer": "python2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
